import { SpanKind } from "@opentelemetry/api";
import { getLangWatchTracer } from "langwatch";
import { createLogger } from "~/utils/logger";
import type { AggregateType } from "../domain/aggregateType";
import type { Event, Projection } from "../domain/types";
import type { EventHandlerDefinition } from "../eventHandler.types";
import type { ProjectionDefinition } from "../projection.types";
import type { EventPublisher } from "../publishing/eventPublisher.types";
import type { ProcessorCheckpointStore } from "../stores/eventHandlerCheckpointStore.types";
import type {
  EventStore,
  EventStoreReadContext,
} from "../stores/eventStore.types";
import { EventUtils } from "../utils/event.utils";
import { CheckpointManager } from "./checkpoints/checkpointManager";
import { ConfigurationError } from "./errorHandling";
import type {
  EventSourcingOptions,
  EventSourcingServiceOptions,
  ReplayEventsOptions,
  UpdateProjectionOptions,
} from "./eventSourcingService.types";
import { DEFAULT_UPDATE_LOCK_TTL_MS } from "./eventSourcingService.types";
import { EventHandlerDispatcher } from "./handlers/eventHandlerDispatcher";
import { ProjectionUpdater } from "./projections/projectionUpdater";
import { QueueProcessorManager } from "./queues/queueProcessorManager";
import { EventProcessorValidator } from "./validation/eventProcessorValidator";

/**
 * Main service that orchestrates event sourcing.
 * Coordinates between event stores, projection stores, and event handlers.
 */
export class EventSourcingService<
  EventType extends Event = Event,
  ProjectionType extends Projection = Projection,
> {
  private readonly tracer = getLangWatchTracer(
    "langwatch.trace-processing.event-sourcing-service",
  );
  private readonly logger: ReturnType<typeof createLogger>;

  private readonly pipelineName: string;
  private readonly aggregateType: AggregateType;
  private readonly eventStore: EventStore<EventType>;
  private readonly projections?: Map<
    string,
    ProjectionDefinition<EventType, any>
  >;
  private readonly eventPublisher?: EventPublisher<EventType>;
  private readonly eventHandlers?: Map<
    string,
    EventHandlerDefinition<EventType>
  >;
  private readonly options: EventSourcingOptions<EventType>;
  private readonly queueManager: QueueProcessorManager<EventType>;
  private readonly handlerDispatcher: EventHandlerDispatcher<EventType>;
  private readonly projectionUpdater: ProjectionUpdater<EventType>;

  constructor({
    pipelineName,
    aggregateType,
    eventStore,
    projections,
    eventPublisher,
    eventHandlers,
    processorCheckpointStore,
    serviceOptions,
    logger,
    distributedLock,
    updateLockTtlMs = DEFAULT_UPDATE_LOCK_TTL_MS,
    handlerLockTtlMs = 30000,
    queueProcessorFactory,
  }: EventSourcingServiceOptions<EventType, ProjectionType> & {
    processorCheckpointStore?: ProcessorCheckpointStore;
  }) {
    this.pipelineName = pipelineName;
    this.aggregateType = aggregateType;
    this.eventStore = eventStore;
    this.projections = projections
      ? new Map(Object.entries(projections))
      : void 0;
    this.eventPublisher = eventPublisher;
    this.eventHandlers = eventHandlers
      ? new Map(Object.entries(eventHandlers))
      : void 0;
    this.options = serviceOptions ?? {};
    this.logger =
      logger ??
      createLogger("langwatch.trace-processing.event-sourcing-service");

    // Warn in production if distributed lock is not provided
    if (process.env.NODE_ENV === "production" && !distributedLock) {
      this.logger.warn(
        {
          aggregateType,
        },
        "[SECURITY] EventSourcingService initialized without distributed lock in production. Concurrent updates of the same aggregate projection may result in lost updates (last write wins). Consider providing a DistributedLock implementation.",
      );
    }

    // Warn in production if queue factory is not provided (handlers will be synchronous)
    if (
      process.env.NODE_ENV === "production" &&
      !queueProcessorFactory &&
      eventHandlers &&
      Object.keys(eventHandlers).length > 0
    ) {
      this.logger.warn(
        {
          aggregateType,
        },
        "[PERFORMANCE] EventSourcingService initialized without queue processor factory in production. Event handlers will be executed synchronously, blocking event storage. Consider providing a QueueProcessorFactory for async processing.",
      );
    }

    // Warn in production if queue factory is not provided (projections will be synchronous)
    if (
      process.env.NODE_ENV === "production" &&
      !queueProcessorFactory &&
      projections &&
      Object.keys(projections).length > 0
    ) {
      this.logger.warn(
        {
          aggregateType,
        },
        "[PERFORMANCE] EventSourcingService initialized without queue processor factory in production. Projections will be executed synchronously, blocking event storage. Consider providing a QueueProcessorFactory for async processing.",
      );
    }

    // Initialize components
    const validator = new EventProcessorValidator({
      eventStore,
      aggregateType,
      processorCheckpointStore,
      pipelineName: this.pipelineName,
    });

    const checkpointManager = new CheckpointManager(
      this.pipelineName,
      processorCheckpointStore,
    );

    this.queueManager = new QueueProcessorManager<EventType>({
      aggregateType,
      queueProcessorFactory,
    });

    this.handlerDispatcher = new EventHandlerDispatcher<EventType>({
      aggregateType,
      eventHandlers: this.eventHandlers,
      processorCheckpointStore,
      validator,
      checkpointManager,
      queueManager: this.queueManager,
      distributedLock,
      handlerLockTtlMs,
    });

    this.projectionUpdater = new ProjectionUpdater<EventType>({
      aggregateType,
      eventStore,
      projections: this.projections,
      processorCheckpointStore,
      distributedLock,
      updateLockTtlMs,
      ordering: this.options.ordering ?? "timestamp",
      validator,
      checkpointManager,
      queueManager: this.queueManager,
    });

    // Initialize queue processors for event handlers if factory is provided
    if (queueProcessorFactory && eventHandlers) {
      this.queueManager.initializeHandlerQueues(
        eventHandlers,
        async (handlerName, event, context) => {
          const handlerDef = this.eventHandlers?.get(handlerName);
          if (!handlerDef) {
            throw new ConfigurationError(
              "EventSourcingService",
              `Handler "${handlerName}" not found`,
              { handlerName },
            );
          }
          await this.handlerDispatcher.handleEvent(
            handlerName,
            handlerDef,
            event,
            context,
          );
        },
      );
    }

    // Initialize queue processors for projections if factory is provided
    if (queueProcessorFactory && projections) {
      this.queueManager.initializeProjectionQueues(
        projections,
        async (projectionName, event, context) => {
          const projectionDef = this.projections?.get(projectionName);
          if (!projectionDef) {
            throw new ConfigurationError(
              "EventSourcingService",
              `Projection "${projectionName}" not found`,
              { projectionName },
            );
          }
          await this.projectionUpdater.processProjectionEvent(
            projectionName,
            projectionDef,
            event,
            context,
          );
        },
      );
    }
  }

  /**
   * Stores events using the pipeline's aggregate type.
   *
   * This method automatically uses the aggregate type configured for this pipeline,
   * preventing copy/paste mistakes where the wrong aggregate type is passed.
   *
   * **Execution Flow:**
   * 1. Events are stored in the event store (must succeed)
   * 2. Events are published to the event publisher (if configured) - errors are logged but don't fail
   * 3. Events are dispatched to event handlers (if configured) - errors are logged but don't fail
   * 4. Projections are automatically updated - errors are logged but don't fail
   *
   * **Concurrency:** Safe for concurrent calls with different aggregateIds. Concurrent calls for the same
   * aggregateId are safe at the event store level, but projection updates may conflict (use distributedLock).
   *
   * **Failure Modes:**
   * - Event store failures throw and prevent storage
   * - Publisher/handler/projection failures are logged but don't prevent storage
   * - Invalid tenantId throws before any operations
   *
   * **Performance:** O(n) where n is the number of events. Projection updates are O(m) where m is the
   * number of projections × number of unique aggregateIds in the event batch.
   *
   * @param events - Events to store
   * @param context - Security context with required tenantId
   * @throws {Error} If tenantId is invalid or event store operation fails
   */
  async storeEvents(
    events: readonly EventType[],
    context: EventStoreReadContext<EventType>,
  ): Promise<void> {
    return await this.tracer.withActiveSpan(
      "EventSourcingService.storeEvents",
      {
        kind: SpanKind.INTERNAL,
        attributes: {
          "aggregate.type": this.aggregateType,
          "event.count": events.length,
          "tenant.id": context.tenantId,
          "event.types": [...new Set(events.map((e) => e.type))].join(","),
        },
      },
      async (span) => {
        EventUtils.validateTenantId(context, "storeEvents");

        // Enrich events with trace context if missing (for debugging)
        const enrichedEvents = events.map((event) => {
          const enrichedMetadata =
            EventUtils.buildEventMetadataWithCurrentProcessingTraceparent(
              event.metadata,
            );
          if (enrichedMetadata === event.metadata) {
            return event;
          }
          return {
            ...event,
            metadata: enrichedMetadata,
          } as EventType;
        });

        span.addEvent("event_store.store.start");
        await this.eventStore.storeEvents(
          enrichedEvents,
          context,
          this.aggregateType,
        );
        span.addEvent("event_store.store.complete");

        // Publish events after successful storage
        if (this.eventPublisher && enrichedEvents.length > 0) {
          span.addEvent("publisher.publish.start");
          try {
            await this.eventPublisher.publish(enrichedEvents, context);
            span.addEvent("publisher.publish.complete");
          } catch (error) {
            span.addEvent("publisher.publish.error", {
              "error.message":
                error instanceof Error ? error.message : String(error),
            });
            // Log publishing errors but don't fail the storage operation
            if (this.logger) {
              this.logger.error(
                {
                  aggregateType: this.aggregateType,
                  eventCount: enrichedEvents.length,
                  error: error instanceof Error ? error.message : String(error),
                },
                "Failed to publish events to external system",
              );
            }
          }
        }

        // Dispatch events to handlers after successful storage
        if (this.eventHandlers && enrichedEvents.length > 0) {
          span.addEvent("handler.dispatch.start");
          await this.handlerDispatcher.dispatchEventsToHandlers(
            enrichedEvents,
            context,
          );
          span.addEvent("handler.dispatch.complete");
        }

        if (this.projections && enrichedEvents.length > 0) {
          span.addEvent("projection.update.start");
          await this.projectionUpdater.updateProjectionsForAggregates(
            enrichedEvents,
            context,
          );
          span.addEvent("projection.update.complete");
        }
      },
    );
  }

  /**
   * Updates a specific projection by name for a given aggregate.
   *
   * This method processes all events for the aggregate and updates the projection state.
   * Projections are automatically updated after events are stored via storeEvents(),
   * but this method can be used for manual updates (e.g., recovery or reprocessing).
   *
   * **Concurrency:** Uses distributed lock (if configured) to prevent concurrent updates of the same
   * aggregate projection. The lock key includes the projection name to ensure different projections
   * for the same aggregate can be updated concurrently, while the same projection is updated serially.
   * Lock key format: `update:${aggregateType}:${aggregateId}:${projectionName}`
   * If lock acquisition fails, throws an error (caller should retry via queue).
   * Without a distributed lock, concurrent updates may result in lost updates (last write wins).
   *
   * **Performance:** O(n) where n is the number of events for the aggregate. Lock acquisition adds
   * network latency if using Redis-based locks.
   *
   * **Failure Modes:**
   * - Throws if projection name not found
   * - Throws if no events found for aggregate
   * - Throws if distributed lock cannot be acquired (if lock is configured)
   * - Throws if tenantId is invalid
   * - Projection store errors propagate (not caught)
   *
   * @param projectionName - The name of the projection to update
   * @param aggregateId - The aggregate to update projection for
   * @param context - Security context with required tenantId for event store access
   * @param options - Optional options including projection store context override
   * @returns The updated projection
   * @throws {Error} If projection name not found, no events found, lock acquisition fails, or tenantId is invalid
   */
  async updateProjectionByName<ProjectionName extends string>(
    projectionName: ProjectionName,
    aggregateId: string,
    context: EventStoreReadContext<EventType>,
    options?: UpdateProjectionOptions<EventType>,
  ): Promise<any> {
    return await this.projectionUpdater.updateProjectionByName(
      projectionName,
      aggregateId,
      context,
      options,
    );
  }

  /**
   * Gets a specific projection by name for a given aggregate.
   *
   * @param projectionName - The name of the projection to retrieve
   * @param aggregateId - The aggregate to get projection for
   * @param context - Security context with required tenantId
   * @returns The projection, or null if not found
   * @throws Error if projection name not found or not configured
   */
  async getProjectionByName<ProjectionName extends string>(
    projectionName: ProjectionName,
    aggregateId: string,
    context: EventStoreReadContext<EventType>,
  ): Promise<unknown> {
    return await this.projectionUpdater.getProjectionByName(
      projectionName,
      aggregateId,
      context,
    );
  }

  /**
   * Checks if a specific projection exists for a given aggregate.
   *
   * @param projectionName - The name of the projection to check
   * @param aggregateId - The aggregate to check projection for
   * @param context - Security context with required tenantId
   * @returns True if the projection exists, false otherwise
   * @throws Error if projection name not found or not configured
   */
  async hasProjectionByName<ProjectionName extends string>(
    projectionName: ProjectionName,
    aggregateId: string,
    context: EventStoreReadContext<EventType>,
  ): Promise<boolean> {
    return await this.projectionUpdater.hasProjectionByName(
      projectionName,
      aggregateId,
      context,
    );
  }

  /**
   * Gets the list of available projection names.
   *
   * @returns Array of projection names
   */
  getProjectionNames(): string[] {
    return this.projectionUpdater.getProjectionNames();
  }

  /**
   * Replays events up to a specific timestamp (time travel) for a specific projection.
   *
   * This allows updating projections as they existed at a point in time.
   *
   * **Status:** Not implemented - always throws "Not implemented" error.
   *
   * **Intended Behavior (when implemented):**
   * - Filters events by timestamp (upToTimestamp)
   * - Rebuilds projection from filtered events
   * - Returns projection as it would have existed at that point in time
   *
   * @param projectionName - Name of the projection to replay
   * @param aggregateId - The aggregate to replay events for
   * @param context - Security context with required tenantId for event store access
   * @param options - Options including upToTimestamp and projection store context
   * @returns The projection as it would have existed at the specified timestamp
   * @throws {Error} Always throws "Not implemented" error
   *
   * @example
   * ```typescript
   * // Replay events up to a specific point in time
   * const projection = await service.replayEvents("trace-summary", "trace-123", context, {
   *   upToTimestamp: Date.parse("2024-01-15T10:00:00Z"),
   * });
   * ```
   */
  async replayEvents<ProjectionName extends string>(
    _projectionName: ProjectionName,
    _aggregateId: string,
    _context: EventStoreReadContext<EventType>,
    _options?: ReplayEventsOptions<EventType>,
  ): Promise<any> {
    throw new ConfigurationError(
      "EventSourcingService",
      "Method not implemented",
    );
  }

  /**
   * Replays events for a specific event handler.
   *
   * Useful for reprocessing events after handler changes or recovering from failures.
   *
   * **Status:** Not implemented - always throws "Not implemented" error.
   *
   * **Intended Behavior (when implemented):**
   * - Fetches events for the aggregate (optionally from a specific event ID)
   * - Re-executes the handler for each event
   * - Updates checkpoints after successful processing
   *
   * @param handlerName - Name of the handler to replay events for
   * @param aggregateId - The aggregate to replay events for
   * @param context - Security context with required tenantId
   * @param options - Options including fromEventId to start from a specific event
   * @returns Promise that resolves when replay is complete
   * @throws {Error} Always throws "Not implemented" error
   *
   * @example
   * ```typescript
   * // Replay all events for a handler
   * await service.replayEventsForHandler("clickhouse-writer", "trace-123", context);
   *
   * // Replay from a specific event ID
   * await service.replayEventsForHandler("clickhouse-writer", "trace-123", context, {
   *   fromEventId: "trace_123:1234567890:lw.obs.span_ingestion.recorded"
   * });
   * ```
   */
  async replayEventsForHandler(
    _handlerName: string,
    _aggregateId: string,
    _context: EventStoreReadContext<EventType>,
    _options?: {
      fromEventId?: string;
    },
  ): Promise<void> {
    throw new ConfigurationError(
      "EventSourcingService",
      "Method not implemented",
    );
  }

  /**
   * Gets the queue processor manager for this service.
   * Used by the pipeline builder to initialize command queues.
   */
  getQueueManager(): QueueProcessorManager<EventType> {
    return this.queueManager;
  }

  /**
   * Gracefully closes all queue processors for event handlers, projections, and commands.
   * Should be called during application shutdown to ensure all queued jobs complete.
   */
  async close(): Promise<void> {
    await this.queueManager.close();
  }
}
